package redis

import (
	"context"
	"errors"
	"fmt"
	"strings"
	"time"

	"gitee.com/jonneydong/moleculer-go"
	"gitee.com/jonneydong/moleculer-go/serializer"
	"gitee.com/jonneydong/moleculer-go/transit"
	"github.com/go-redis/redis/v8"
	log "github.com/sirupsen/logrus"
)

type RedisTransporter struct {
	ctx           context.Context
	prefix        string
	client        *redis.Client
	opts          *RedisOptions
	logger        *log.Entry
	serializer    serializer.Serializer
	subscriptions []*redis.PubSub
}

type RedisOptions struct {
	Addr           string
	PassWord       string
	DB             int
	Name           string
	Logger         *log.Entry
	Serializer     serializer.Serializer
	ValidateMsg    transit.ValidateMsgFunc
	AllowReconnect bool
	ReconnectWait  time.Duration
	MaxReconnect   int
}

func CreateRedisTransporter(options RedisOptions) transit.Transport {
	return &RedisTransporter{
		opts:          &options,
		logger:        options.Logger,
		serializer:    options.Serializer,
		subscriptions: []*redis.PubSub{},
		ctx:           context.Background(),
	}
}

func (t *RedisTransporter) Connect() chan error {
	endChan := make(chan error)
	go func() {
		t.logger.Debug("Redis Connect() - Addr:", t.opts.Addr, "- PassWord:", t.opts.PassWord, "-DB", t.opts.DB)
		// fmt.Println("t.opts.Addr", t.opts.Addr)
		rdb := redis.NewClient(&redis.Options{
			// 需要修改成你的配置，本地无需修改
			Addr:     t.opts.Addr,
			Password: t.opts.PassWord,
			DB:       t.opts.DB,
			PoolSize: 100,
		})

		_, err := rdb.Ping(context.Background()).Result()

		if err != nil {
			t.logger.Error("Redis Connect() Error:", err.Error(), " - Addr:", t.opts.Addr, "- PassWord:", t.opts.PassWord, "-DB:", t.opts.DB)
			endChan <- errors.New(fmt.Sprint("Redis Connect() Error:", err.Error(), " - Addr:", t.opts.Addr, "- PassWord:", t.opts.PassWord, "-DB:", t.opts.DB))
			return
		}

		t.logger.Info("Connect to - Addr:", t.opts.Addr, "- PassWord:", t.opts.PassWord, "-DB:", t.opts.DB)
		t.client = rdb
		endChan <- nil
	}()
	return endChan
}

func (t *RedisTransporter) Disconnect() chan error {
	endChan := make(chan error)
	go func() {
		if t.client == nil {
			endChan <- nil
			return
		}

		err := t.client.Close()

		if err != nil {
			t.logger.Error("Clone Redis error:", err)
			endChan <- err
		}

		t.client = nil
		endChan <- nil

	}()
	return endChan
}

func (t *RedisTransporter) topicName(command string, nodeID string) string {
	parts := []string{t.prefix, command}
	if nodeID != "" {
		parts = append(parts, nodeID)
	}
	return strings.Join(parts, ".")
}

var ChannelOptions = make([]redis.ChannelOption, 0)

func (t *RedisTransporter) Subscribe(command, nodeID string, handler transit.TransportHandler) {
	if t.client == nil {
		msg := fmt.Sprint("redis.Subscribe() No connection :( -> command: ", command, " nodeID: ", nodeID)
		t.logger.Warn(msg)
		return
	}

	topic := t.topicName(command, nodeID)
	ChannelOptions = append(ChannelOptions, redis.WithChannelSize(100000))
	sub := t.client.Subscribe(t.ctx, topic)

	go func() {
		defer sub.Close()
		for {
			// time.Sleep(2 * time.Second)
			// for msg := range sub.Channel(ChannelOptions...) {
			// 	// fmt.Println("msg.Channel:", msg.Channel, "请求主题:", msg.Payload)
			// 	var data []byte = []byte(msg.Payload)
			// 	payload := t.serializer.BytesToPayload(&data)
			// 	t.logger.Debug(fmt.Sprintf("Incoming %s packet from '%s'", topic, payload.Get("sender").String()))
			// 	handler(payload)
			// }

			select {
			case msg := <-sub.Channel(ChannelOptions...):
				var data []byte = []byte(msg.Payload)
				payload := t.serializer.BytesToPayload(&data)
				t.logger.Debug(fmt.Sprintf("Incoming %s packet from '%s'", topic, payload.Get("sender").String()))
				go handler(payload)
			}

		}
	}()

	t.subscriptions = append(t.subscriptions, sub)
}

func (t *RedisTransporter) Publish(command, nodeID string, message moleculer.Payload) {
	if t.client == nil {
		msg := fmt.Sprint("redis.Publish() No connection :( -> command: ", command, " nodeID: ", nodeID)
		t.logger.Warn(msg)
		return
	}

	topic := t.topicName(command, nodeID)
	t.logger.Debug("Redis.Publish() command: ", command, " topic: ", topic, " nodeID: ", nodeID)
	t.logger.Trace("message: \n", message, "\n - end")

	err := t.client.Publish(t.ctx, topic, t.serializer.PayloadToBytes(message)).Err()
	if err != nil {
		t.logger.Error("Error on publish: error: ", err, " command: ", "command", " topic: ", topic)
	}
}

func (t *RedisTransporter) SetPrefix(prefix string) {
	t.prefix = prefix
}

func (t *RedisTransporter) SetNodeID(nodeID string) {

}

func (t *RedisTransporter) SetSerializer(serializer serializer.Serializer) {
	// Ignored while transporter initialized in pubsub function
}
